feat(flashblocks): shift atomic gossip op, wspub on seq after p2p gossip#210
feat(flashblocks): shift atomic gossip op, wspub on seq after p2p gossip#210louisliu2048 merged 12 commits intomainfrom
Conversation
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review this PR |
|
Claude finished @sieniven's task in 3m 31s —— View job PR Review: feat(flashblocks): shift atomic gossip op, wspub on seq after p2p gossip
SummaryGood architectural direction — removing 🔴 Critical: WS publishes before p2p gossip in
|
🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
… flag Rename p2p/ module to broadcast/ to better reflect its dual role (p2p gossip + WS publishing). Move wspub.rs into broadcast/, remove the p2p_enabled config flag so the broadcast node always runs, fix WS publish ordering to run after p2p gossip, and improve error handling so ws_pub failures warn instead of crashing the node. Update stale comments referencing the removed p2p_enabled flag. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@claude please help to review again, check that all your previous mentioned points have been resolved |
|
Claude finished @sieniven's task in 2m 13s —— View job PR Re-Review: feat(flashblocks): shift atomic gossip op, wspub on seq after p2p gossip
All three previously raised issues have been resolved. Details below. ✅ Fixed: WS publish ordering (was 🔴 Critical)
if let Err(e) = outgoing_streams_handler.broadcast_message(message.clone()).await {
warn!(...) // p2p first
}
if let Message::OpFlashblockPayload(ref fb_payload) = message {
match ws_pub.publish(fb_payload) { ... } // WS second
}
✅ Fixed:
|
…ntics 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
When `broadcast_message` fails to write to a peer's stream, the stream was silently dropped but the peer entry remained in `StreamsHandler`, causing every subsequent broadcast to emit a `no stream for protocol` warning indefinitely — confirmed in production logs. Two-part fix: - `broadcast_message` now carries the `PeerId` through the error path, evicts the peer from the map on failure, and returns the list of failed peers to the caller instead of swallowing the error. - `Node::run` immediately attempts `open_stream` for each failed peer after every broadcast, recovering the yamux stream on the existing TCP connection without waiting for the 60 s retry tick. Also adds two unit tests in `mod.rs` that drive real libp2p swarms: `broadcast_evicts_peer_on_stream_failure` and `broadcast_returns_empty_failed_peers_on_success`. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Speed up P2P peer reconnection by lowering the default retry interval, improving flashblocks broadcast recovery after transient disconnects. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace test-only StreamProtocol constants with the actual FLASHBLOCKS_STREAM_PROTOCOL to ensure tests exercise the same protocol negotiation as production code. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Namespace all WebSocketPublisher log messages under payload_builder::broadcast for easier filtering and log segregation. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
@claude review the PR again |
|
Claude finished @sieniven's task in 2m 35s —— View job PR Review: feat(flashblocks): shift atomic gossip op, wspub on seq after p2p gossip
All three issues from the first review were confirmed fixed in the second review. This third pass verifies nothing has regressed and checks the latest commits ( ✅ All previous issues remain fixedWS ordering ( Error propagation: Metric coverage: New findings from latest commits
One observation (non-blocking)
Some(payload) = built_fb_payload_rx.recv() => {
let _ = p2p_tx.send(Message::from_flashblock_payload(payload)).await;
}
Data flow correctness (end-to-end)
No double-publish between paths (outgoing vs. incoming channels are distinct). ✅ LGTM. The PR achieves its stated goal cleanly. The design trade-off (no delivery ACK from peers, TCP kernel-buffer as the "happens-before" boundary) is clearly documented in comments and is a reasonable choice for low-latency flashblock streaming. |
If `broadcast_message` returns `Err` (message serialization failure), skip publishing the flashblock to WebSocket subscribers via `continue`. Previously the ws publish ran unconditionally after the match block. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Upholds the reorg-safety invariant from PR #210: WebSocket publish to RPC subscribers only happens if all follower sequencers received the flashblock via P2P. If any peer's stream send failed, that peer is a valid leader candidate without the flashblock — publishing to WS would cause a reorg on sequencer switch. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Replace the pre-loop `has_failed_peers` flag with iteration by reference (`for &peer_id in &failed_peers`), keeping the vec accessible after the loop so emptiness can be checked directly. PeerId is Copy so the destructuring pattern is idiomatic. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
|
@claude review again |
|
Claude finished @sieniven's task in 1m 48s —— View job PR Re-Review (4th pass): feat(flashblocks): shift atomic gossip op, wspub on seq after p2p gossip
All previously raised issues remain fixed. This pass reviews the 5 newest commits added since the last review. ✅ All previous fixes still holdWS ordering ( Error propagation: Metric coverage: broadcast node always spawned, New commits analysis
if !failed_peers.is_empty() {
continue; // skip WS publish entirely
}This tightens the atomicity guarantee: WS publish only fires when ALL peers successfully received the flashblock. One flashblock per stream failure is dropped to ws subscribers. The trade-off is intentional and consistent with the PR's zero-reorg goal. Worth noting: after the
Err(e) => {
warn!(...);
continue; // skip WS publish on serialize error
}Consistent with the "p2p-first" design — serialization failures now also suppress WS publish. Correct.
for &peer_id in &failed_peers {Clean refactor. Avoids consuming
One observation (non-blocking)
Stream recovery calls Data flow (still correct end-to-end)
No double-publish between paths. ✅ LGTM. The latest commits correctly tighten the atomicity guarantee (skip WS on any peer failure) and clean up minor refactors. The PR achieves its stated goal cleanly. |
Summary
The PR ensures zero re-org on subscribed flashblocks RPC nodes by ensuring p2p gossip to follower sequencers is a blocking successful operation first, before gossiping the flashblocks to RPC nodes. This allows for atomicity on flashblocks replay during sequencer switches / failures of the leader sequencer, to replay consistent gossiped flashblocks across RPC nodes - ensuring flashblocks re-org is guaranteed protected on RPC nodes. Note that the websocket publishing logic is shifted into the new broadcast submodule of the xlayer-builder crate.
In addition, on p2p node:
Reorg considerations
In the case where p2p broadcast fails on the transport layer, the blocking broadcast to peers operation is only on the application level which may still cause a potential reorg.